Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk indexer util: Add a worker ID context value to flush events #647

Closed
wants to merge 2 commits into from

Conversation

DagW
Copy link

@DagW DagW commented Dec 10, 2024

Description

Describe what this change achieves.

This PR adds a context value to the flush event callbacks, to allow clients to determine which worker the callback comes from.
This allows detailed logging and rate per worker metrics, aswell as custom rate control.

	indexerConfig := opensearchutil.BulkIndexerConfig{
                .......
		OnFlushStart: b.onFlushStart,
		OnFlushEnd:   b.onFlushEnd,
	}

func (b *BulkWriter) onFlushStart(ctx context.Context) context.Context {
	log.Println("flush start by worker ID:", ctx.Value(opensearchutil.WorkerCtxKey))
	b.metrics.ObserveBulkIndexerFlush(ctx.Value(opensearchutil.WorkerCtxKey))
	return ctx
}

Issues Resolved

List any issues this PR will resolve, e.g. Closes [...].

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@DagW DagW changed the title Add a worker ID context value to flush events Bulk indexer util: Add a worker ID context value to flush events Dec 10, 2024
Copy link

codecov bot commented Dec 10, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 15.49%. Comparing base (06a6dc8) to head (2569d0c).
Report is 76 commits behind head on main.

❗ There is a different number of reports uploaded between BASE (06a6dc8) and HEAD (2569d0c). Click for more details.

HEAD has 1 upload less than BASE
Flag BASE (06a6dc8) HEAD (2569d0c)
integration 1 0
Additional details and impacted files
@@             Coverage Diff             @@
##             main     #647       +/-   ##
===========================================
- Coverage   57.29%   15.49%   -41.81%     
===========================================
  Files         315      316        +1     
  Lines        9823     9902       +79     
===========================================
- Hits         5628     1534     -4094     
- Misses       2902     8286     +5384     
+ Partials     1293       82     -1211     
Flag Coverage Δ
integration ?
unit 15.49% <100.00%> (+2.64%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
opensearchutil/bulk_indexer.go 76.56% <100.00%> (+2.75%) ⬆️

... and 300 files with indirect coverage changes

@DagW
Copy link
Author

DagW commented Dec 10, 2024

Closed, as I noticed you can return a context with new values

func (b *BulkWriter) onFlushStart(ctx context.Context) context.Context {
	uuid := uuid.NewString()
	fmt.Println("starting flush:", uuid)
	ctx = context.WithValue(ctx, WorkerCtxKey, uuid)
	b.metrics.ObserveBulkIndexerFlush()
	return ctx
}

@DagW DagW closed this Dec 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant